package client

import (
	"context"
	"crypto/x509"
	"errors"
	"fmt"
	"gitee.com/zhucheer/pub-connect/app/define"
	pubproto "gitee.com/zhucheer/pub-connect/app/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/connectivity"
	"google.golang.org/grpc/credentials"
	"io"
	"log"
	"time"
)

type ActiveStreamFun func(c context.Context, sessionId string, clientOpts *VwPubClientOpts, stream *pubproto.PubConnect_DoWorkClient)

// NewStreamClient 启动自定义流客户端连接
func NewStreamClient(c context.Context, conn *grpc.ClientConn, clientOpts *VwPubClientOpts, streamDo ActiveStreamFun) (err error) {
	if clientOpts == nil || clientOpts.AppName == "" {
		log.Fatalf("client options not found")
		return
	}
	if clientOpts.StopSign == nil {
		clientOpts.StopSign = make(chan int, 1)
	}

	if conn != nil {
		log.Println("grpc status: ", conn.GetState())
		if conn.GetState() != connectivity.Shutdown {
			// 已经是连接状态
			return errors.New("is connected")
		}
		conn.Close()
	}
	var opts []grpc.DialOption
	if clientOpts.TSL.ServerName != "" {
		cp := x509.NewCertPool()
		if !cp.AppendCertsFromPEM([]byte(clientOpts.TSL.CertContent)) {
			return fmt.Errorf("credentials: failed to append certificates")
		}

		// grpc.tsl.hostname 为ca证书签名的域名
		creds := credentials.NewClientTLSFromCert(cp, clientOpts.TSL.ServerName)
		opts = append(opts, grpc.WithTransportCredentials(creds))
	} else {
		opts = append(opts, grpc.WithInsecure())
	}

	opts = append(opts, grpc.WithBlock())
	ctx, _ := context.WithTimeout(context.TODO(), 30*time.Second)
	conn, err = grpc.DialContext(ctx, clientOpts.ServerAddr, opts...)
	if err != nil {
		fmt.Println("=====srv addr stream=======", clientOpts.ServerAddr)
		return
	}

	client := pubproto.NewPubConnectClient(conn)
	log.Printf("connect <==stream==> grpc server at serverAddr %v", clientOpts.ServerAddr)

	err = clientStreamDoWork(c, client, clientOpts, streamDo)

	conn.Close()

	// 发送连接关闭事件
	clientOpts.StopSign <- 1
	fmt.Println("client stream do done", clientOpts.AppName, clientOpts.SessionId)
	return
}

// stream 执行方法
func clientStreamDoWork(ctx context.Context, client pubproto.PubConnectClient,
	opts *VwPubClientOpts, streamDo ActiveStreamFun) (err error) {
	appName := opts.AppName
	secret := opts.Secret
	appUuid := opts.SessionId

	// 执行双向监听stream流
	stream, err := client.DoWork(ctx)
	if err != nil {
		log.Printf("fail to run Do work: %v", err)
		return
	}
	// 将stream流注入回调方法
	go streamDo(ctx, opts.SessionId, opts, &stream)

	// 双向通道连接后发送一个初始心跳消息
	err = stream.Send(&pubproto.WorkDetail{
		PriKey:  secret,
		AppName: appName,
		AppUuid: appUuid,
		Name:    define.WorkAgentHeartbeatMsg,
	})
	if err != nil {
		log.Printf("error stream.Send err:%v", err)
		return
	}

	for {
		// 持续监听服务端数据流,收到服务端消息进行后续回复动作
		in, err := stream.Recv()
		if err == io.EOF {
			// read done.
			log.Printf("read done  EOF")
			break
		}
		if err != nil {
			log.Printf("Failed to receive err %v", err)
			break
		}

		fmt.Println("=========client recv====", in.Name, in.Uuid, in.AppName)

		if in.AppName == appName && in.AppUuid == appUuid {
			resp, err := *in, errors.New("sign wrong")
			if in.PriKey != secret {
				in.Status = define.StatusFailed
				log.Printf("secret err req %s, actual %s ", in.PriKey, secret)
			} else {
				log.Printf("begin handle work name: %s, uid: %s status is %s, params: %s", in.Name, in.Uuid, in.Status, in.Params)

				if in.Name == "create" && in.Status == define.StatusFailed {
					fmt.Println(in.Results)
					time.Sleep(time.Second)
					break
				}

				// 执行指定任务
				resp, err = opts.RecvHandler(in)
				if err != nil {
					log.Printf("handlerDo err: %v", err)
				}
			}

			// 执行结果向服务端推流
			resp.ResponseTime = time.Now().Unix()
			// 向服务端发送数据
			err = stream.Send(&resp)
			if err != nil {
				log.Printf("handlerDo Send err: %v", err)
			}
		}
	}

	return
}
